/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2019 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

//
// $Id: batchprimitiveprocessor-jl.cpp 9705 2013-07-17 20:06:07Z pleblanc $
// C++ Implementation: batchprimitiveprocessor
//
// Description:
//
//
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
//
// Copyright: See COPYING file that comes with this distribution
//
//

#include <unistd.h>
//#define NDEBUG
#include <cassert>
#include <stdexcept>
#include <iostream>
#include <set>
using namespace std;

#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
namespace bu = boost::uuids;

#include "calpontsystemcatalog.h"
using namespace execplan;

#include "bpp-jl.h"
#include "jlf_common.h"
using namespace messageqcpp;
using namespace rowgroup;
using namespace joiner;

#define XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX

namespace joblist
{
BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm)
 : ot(BPS_ELEMENT_TYPE)
 , needToSetLBID(true)
 , count(1)
 , baseRid(0)
 , ridCount(0)
 , needStrValues(false)
 , wideColumnsWidths(0)
 , filterCount(0)
 , projectCount(0)
 , needRidsAtDelivery(false)
 , ridMap(0)
 , sendValues(false)
 , sendAbsRids(false)
 , _hasScan(false)
 , LBIDTrace(false)
 , tupleLength(0)
 , status(0)
 , valueColumn(0)
 , sendTupleJoinRowGroupData(false)
 , bop(BOP_AND)
 , forHJ(false)
 , threadCount(1)
 , fJoinerChunkSize(rm->getJlJoinerChunkSize())
 , hasSmallOuterJoin(false)
 , _priority(1)
{
  PMJoinerCount = 0;
  uuid = bu::nil_generator()();
}

BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
{
}

void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
                                              vector<BRM::LBID_t> lastScannedLBID,
                                              bool hasAuxCol,
                                              const std::vector<BRM::EMEntry>& extentsAux,
                                              execplan::CalpontSystemCatalog::OID oidAux)
{
  SCommand cc;

  tableOID = scan.tableOid();
  cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux));
  cc->setBatchPrimitiveProcessor(this);
  cc->setQueryUuid(scan.queryUuid());
  cc->setStepUuid(uuid);
  filterSteps.push_back(cc);
  filterCount++;
  _hasScan = true;
  if (utils::isWide(cc->getWidth()))
    wideColumnsWidths |= cc->getWidth();
  idbassert(sessionID == scan.sessionId());
}

void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs)
{
  SCommand cc;

  tableOID = pcs.tableOid();
  cc.reset(new PseudoCCJL(pcs));
  cc->setBatchPrimitiveProcessor(this);
  cc->setQueryUuid(pcs.queryUuid());
  cc->setStepUuid(uuid);
  filterSteps.push_back(cc);
  filterCount++;
  idbassert(sessionID == pcs.sessionId());
}

void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step)
{
  SCommand cc;

  tableOID = step.tableOid();
  cc.reset(new ColumnCommandJL(step));
  cc->setBatchPrimitiveProcessor(this);
  cc->setQueryUuid(step.queryUuid());
  cc->setStepUuid(uuid);
  filterSteps.push_back(cc);
  filterCount++;
  if (utils::isWide(cc->getWidth()))
    wideColumnsWidths |= cc->getWidth();
  idbassert(sessionID == step.sessionId());
}

void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step)
{
  SCommand cc;

  tableOID = step.tableOid();

  if (filterCount == 0)
  {
    sendAbsRids = true;
    sendValues = true;
    absRids.reset(new uint64_t[8192]);
  }

  cc.reset(new DictStepJL(step));
  cc->setBatchPrimitiveProcessor(this);
  cc->setQueryUuid(step.queryUuid());
  cc->setStepUuid(uuid);

#if defined(XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX)
  if (filterSteps.size() > 0)
  {
    size_t stepsIndex = filterSteps.size() - 1;
    SCommand prevCC = filterSteps[stepsIndex];
    ColumnCommandJL* pcc = dynamic_cast<ColumnCommandJL*>(prevCC.get());
    DictStepJL* ccc = dynamic_cast<DictStepJL*>(cc.get());
    if (pcc && ccc)
    {
      filterSteps[stepsIndex].reset(
          new ColumnCommandJL(*pcc, *ccc));  // column command will use same filters.
    }
  }
#endif
  filterSteps.push_back(cc);
  filterCount++;
  needStrValues = true;
  idbassert(sessionID == step.sessionId());
}

void BatchPrimitiveProcessorJL::addFilterStep(const FilterStep& step)
{
  SCommand cc;

  tableOID = step.tableOid();
  cc.reset(new FilterCommandJL(step));
  cc->setBatchPrimitiveProcessor(this);
  cc->setQueryUuid(step.queryUuid());
  cc->setStepUuid(uuid);
  filterSteps.push_back(cc);
  filterCount++;
  idbassert(sessionID == step.sessionId());
}

void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step)
{
  SCommand cc;

  cc.reset(new PseudoCCJL(step));
  cc->setBatchPrimitiveProcessor(this);
  cc->setTupleKey(step.tupleId());
  cc->setQueryUuid(step.queryUuid());
  cc->setStepUuid(uuid);
  projectSteps.push_back(cc);
  colWidths.push_back(cc->getWidth());
  tupleLength += cc->getWidth();
  projectCount++;
  idbassert(sessionID == step.sessionId());
}

void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step)
{
  SCommand cc;

  cc.reset(new ColumnCommandJL(step));
  cc->setBatchPrimitiveProcessor(this);
  cc->setTupleKey(step.tupleId());
  cc->setQueryUuid(step.queryUuid());
  cc->setStepUuid(uuid);
  projectSteps.push_back(cc);
  colWidths.push_back(cc->getWidth());
  tupleLength += cc->getWidth();
  projectCount++;
  if (utils::isWide(cc->getWidth()))
    wideColumnsWidths |= cc->getWidth();
  idbassert(sessionID == step.sessionId());
}

void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step)
{
  SCommand cc;

  cc.reset(new PassThruCommandJL(step));
  cc->setBatchPrimitiveProcessor(this);
  cc->setTupleKey(step.tupleId());
  cc->setQueryUuid(step.queryUuid());
  cc->setStepUuid(uuid);
  projectSteps.push_back(cc);
  colWidths.push_back(cc->getWidth());
  tupleLength += cc->getWidth();
  projectCount++;

  if (utils::isWide(cc->getWidth()))
    wideColumnsWidths |= cc->getWidth();

  if (filterCount == 0)
    sendValues = true;

  idbassert(sessionID == step.sessionId());
}

void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& col, const pDictionaryStep& dict)
{
  SCommand cc;

  cc.reset(new RTSCommandJL(col, dict));
  cc->setBatchPrimitiveProcessor(this);
  cc->setTupleKey(dict.tupleId());
  cc->setQueryUuid(col.queryUuid());
  cc->setStepUuid(uuid);
  projectSteps.push_back(cc);
  colWidths.push_back(cc->getWidth());
  tupleLength += cc->getWidth();
  projectCount++;
  needStrValues = true;
  idbassert(sessionID == col.sessionId());
  idbassert(sessionID == dict.sessionId());
}

void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDictionaryStep& dict)
{
  SCommand cc;

  cc.reset(new RTSCommandJL(p, dict));
  cc->setBatchPrimitiveProcessor(this);
  cc->setTupleKey(dict.tupleId());
  cc->setQueryUuid(p.queryUuid());
  cc->setStepUuid(uuid);
  projectSteps.push_back(cc);
  colWidths.push_back(cc->getWidth());
  tupleLength += cc->getWidth();
  projectCount++;
  needStrValues = true;

  if (filterCount == 0)
  {
    sendValues = true;
    sendAbsRids = true;
    absRids.reset(new uint64_t[8192]);
  }

  idbassert(sessionID == p.sessionId());
  idbassert(sessionID == dict.sessionId());
}

#if 0
void BatchPrimitiveProcessorJL::addDeliveryStep(const DeliveryStep& ds)
{
    idbassert(sessionID == ds.sessionId());

    uint32_t i;

    templateTB = ds.fEmptyTableBand;
    tableOID = ds.fTableOID;

    tableColumnCount = templateTB.getColumnCount();
    idbassert(tableColumnCount > 0);
    tablePositions.reset(new int[tableColumnCount]);

    for (i = 0; i < tableColumnCount; i++)
        tablePositions[i] = -1;

    idbassert(projectCount <= projectSteps.size());

    /* In theory, tablePositions maps a column's table position to a projection step
     -1 means it's a null column */
    CalpontSystemCatalog::OID oid;
    int idx;

    for (i = 0; i < projectCount; i++)
    {
        oid = static_cast<CalpontSystemCatalog::OID>(projectSteps[i]->getOID());

        if (oid > 0)
        {
            idx = templateTB.find(oid);

            if (idx >= 0)
            {
                tablePositions[idx] = i;
            }
            else
            {
                cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): didn't find OID " << oid <<
                     " in tableband at pjstep idx " << i << endl;
            }
        }
        else
        {
            cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): pjstep idx " << i <<
                 " doesn't have a valid OID" << endl;
        }
    }
}
#endif

void BatchPrimitiveProcessorJL::addElementType(const ElementType& et, uint32_t dbroot)
{
  uint32_t i;
  // 	rowCounter++;

  if (needToSetLBID)
  {
    needToSetLBID = false;

    for (i = 0; i < filterCount; ++i)
      filterSteps[i]->setLBID(et.first, dbroot);

    for (i = 0; i < projectCount; ++i)
      projectSteps[i]->setLBID(et.first, dbroot);

    baseRid = et.first & 0xffffffffffffe000ULL;
  }

  // TODO: get rid of magics
  if (sendAbsRids)
    absRids[ridCount] = et.first;
  else
  {
    relRids[ridCount] = et.first & 0x1fff;    // 8192 rows per logical block
    ridMap |= 1 << (relRids[ridCount] >> 9);  // LSB -> 0-511, MSB -> 7680-8191
  }

  if (sendValues)
  {
    // 		cout << "adding value " << et.second << endl;
    values[ridCount] = et.second;
  }

  ridCount++;
  idbassert(ridCount <= 8192);
}

#if 0
void BatchPrimitiveProcessorJL::setRowGroupData(const rowgroup::RowGroup& rg)
{
    uint32_t i;
    rowgroup::Row r;

    inputRG.setData(rg.getData());
    inputRG.initRow(&r);
    inputRG.getRow(0, &r);

    if (needToSetLBID)
    {
        needToSetLBID = false;

        for (i = 0; i < filterCount; ++i)
            filterSteps[i]->setLBID(r.getRid(), 1);

        for (i = 0; i < projectCount; ++i)
            projectSteps[i]->setLBID(r.getRid(), 1);

        baseRid = r.getRid() & 0xffffffffffffe000ULL;
    }
}

#endif

void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint32_t dbroot)
{
  if (filterCount == 0)
    throw logic_error("BPPJL::addElementType(StringElementType): doesn't work without filter steps yet");

  addElementType(ElementType(et.first, et.first), dbroot);
}

/**
 * When the output type is ElementType, the messages have the following format:
 *
 * ISMPacketHeader (ignored)
 * PrimitiveHeader (ignored)
 * --
 * If the BPP starts with a scan
 *    casual partitioning data from the scanned column
 * Base Rid for the returned logical block
 * rid count
 * (rid count)x 16-bit rids
 * (rid count)x 64-bit values
 * If there's a join performed by this BPP
 *    pre-Join rid count   (for logging purposes only)
 *    small-side new match count
 *    (match count)x ElementTypes
 * cached IO count
 * physical IO count
 * block touches
 */

// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in, vector<ElementType>* out, bool* validCPData,
                                                uint64_t* lbid, int64_t* min, int64_t* max,
                                                uint32_t* cachedIO, uint32_t* physIO,
                                                uint32_t* touchedBlocks) const
{
  uint32_t i;
  uint16_t l_count;
  uint64_t l_baseRid;
  uint16_t* rids;
  uint64_t* vals;
  uint8_t* buf;
  uint64_t tmp64;
  uint8_t tmp8;

  /* skip the header */
  idbassert(in.length() > sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
  in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));

  if (_hasScan)
  {
    in >> tmp8;
    *validCPData = (tmp8 != 0);

    if (*validCPData)
    {
      in >> *lbid;

      in >> tmp64;
      *min = (int64_t)tmp64;
      in >> tmp64;
      *max = (int64_t)tmp64;
    }
    else
      in >> *lbid;
  }

  in >> l_baseRid;
  in >> l_count;
  // 	rowsProcessed += l_count;
  idbassert(l_count <= 8192);
  out->resize(l_count);

  buf = (uint8_t*)in.buf();

  rids = (uint16_t*)buf;
  vals = (uint64_t*)(buf + (l_count << 1));
  idbassert(in.length() > (uint32_t)((l_count << 1) + (l_count << 3)));
  in.advance((l_count << 1) + (l_count << 3));

  for (i = 0; i < l_count; ++i)
  {
    (*out)[i].first = rids[i] + l_baseRid;
    // 		if (tableOID >= 3000)
    // 			idbassert((*out)[i].first > 1023);
    (*out)[i].second = vals[i];
  }

  in >> *cachedIO;
  in >> *physIO;
  in >> *touchedBlocks;
  // 	cout << "ET: got physIO=" << (int) *physIO << " cachedIO=" <<
  // 		(int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
  idbassert(in.length() == 0);
}

/**
 * When the output type is StringElementType the messages have the following format:
 *
 * ISMPacketHeader  (ignored)
 * PrimitiveHeader  (ignored)
 * ---
 * If the BPP starts with a scan
 *     Casual partitioning data from the column scanned
 * Rid count
 * (rid count)x 64-bit absolute rids
 * (rid count)x serialized strings
 * cached IO count
 * physical IO count
 * blocks touched
 */

// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in, vector<StringElementType>* out,
                                                      bool* validCPData, uint64_t* lbid, int64_t* min,
                                                      int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
                                                      uint32_t* touchedBlocks) const
{
  uint32_t i;
  uint16_t l_count;
  uint64_t* l_absRids;
  uint64_t tmp64;
  uint8_t tmp8;

  // 	cout << "get String ETs uniqueID\n";
  /* skip the header */
  in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));

  if (_hasScan)
  {
    in >> tmp8;
    *validCPData = (tmp8 != 0);

    if (*validCPData)
    {
      in >> *lbid;
      in >> tmp64;
      *min = (int64_t)tmp64;
      in >> tmp64;
      *max = (int64_t)tmp64;
    }
    else
      in >> *lbid;
  }

  in >> l_count;
  // 	cout << "parsing " << l_count << " strings\n";
  l_absRids = (uint64_t*)in.buf();
  out->resize(l_count);
  in.advance(l_count << 3);

  for (i = 0; i < l_count; ++i)
  {
    (*out)[i].first = l_absRids[i];
    in >> (*out)[i].second;
  }

  in >> *cachedIO;
  in >> *physIO;
  in >> *touchedBlocks;
  // 	cout << "SET: got physIO=" << (int) *physIO << " cachedIO=" <<
  // 		(int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
  idbassert(in.length() == 0);
}

/**
 * When the output type is Tuples, the input message format is the same
 * as when the output type is TableBands
 */

// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getTuples(messageqcpp::ByteStream& in, std::vector<TupleType>* out,
                                          bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
                                          uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const
{
  uint32_t i, j, pos, len;
  uint16_t l_rowCount;
  uint64_t l_baseRid;
  uint16_t* l_relRids;
  uint64_t absRids[8192];
  // const uint8_t* columnData[projectCount];
  const uint8_t** columnData = (const uint8_t**)alloca(projectCount * sizeof(uint8_t*));
  memset(columnData, 0, projectCount * sizeof(uint8_t*));
  const uint8_t* buf;
  // uint32_t colLengths[projectCount];
  uint32_t* colLengths = (uint32_t*)alloca(projectCount * sizeof(uint32_t));
  uint64_t tmp64;
  uint8_t tmp8;

  // 	cout << "getTuples msg is " << in.length() << " bytes\n";
  /* skip the header */
  in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));

  if (_hasScan)
  {
    in >> tmp8;
    *validCPData = (tmp8 != 0);

    if (*validCPData)
    {
      in >> *lbid;
      in >> tmp64;
      *min = (int64_t)tmp64;
      in >> tmp64;
      *max = (int64_t)tmp64;
    }
    else
      in >> *lbid;
  }

  in >> l_rowCount;

  // 	cout << "read " << l_rowCount << " rows\n";

  if (needRidsAtDelivery)
  {
    in >> l_baseRid;
    l_relRids = (uint16_t*)in.buf();

    for (i = 0; i < l_rowCount; i++)
      absRids[i] = l_relRids[i] + l_baseRid;

    in.advance(l_rowCount << 1);
  }

  /* Set up pointers to the column data */
  pos = 0;
  buf = in.buf();

  for (i = 0; i < projectCount; i++)
  {
    colLengths[i] = *((uint32_t*)&buf[pos]);
    pos += 4;
    columnData[i] = &buf[pos];
    // 		cout << "column " << i << " is " << colLengths[i] << " long at " << pos << endl;
    pos += colLengths[i];
    idbassert(pos < in.length());
  }

  in.advance(pos);

  out->resize(l_rowCount);

  for (i = 0; i < l_rowCount; i++)
  {
    (*out)[i].first = absRids[i];
    (*out)[i].second = new char[tupleLength];

    for (j = 0, pos = 0; j < projectCount; j++)
    {
      idbassert(pos + colWidths[j] <= tupleLength);

      if (projectSteps[j]->getCommandType() == CommandJL::RID_TO_STRING)
      {
        len = *((uint32_t*)columnData[j]);
        columnData[j] += 4;
        memcpy(&(*out)[i].second[pos], columnData[j], len);
        pos += len;
        columnData[j] += len;

        // insert padding...
        memset(&(*out)[i].second[pos], 0, colWidths[j] - len);
        pos += colWidths[j] - len;
      }
      else
      {
        switch (colWidths[j])
        {
          case 8:
            *((uint64_t*)&(*out)[i].second[pos]) = *((uint64_t*)columnData[j]);
            columnData[j] += 8;
            pos += 8;
            break;

          case 4:
            *((uint32_t*)&(*out)[i].second[pos]) = *((uint32_t*)columnData[j]);
            columnData[j] += 4;
            pos += 4;
            break;

          case 2:
            *((uint16_t*)&(*out)[i].second[pos]) = *((uint16_t*)columnData[j]);
            columnData[j] += 2;
            pos += 4;
            break;

          case 1:
            (*out)[i].second[pos] = (char)*columnData[j];
            columnData[j]++;
            pos++;
            break;

          // TODO MCOL-641
          case 16:
            // fallthrough
          default:
            cout << "BPP::getTuples(): bad column width of " << colWidths[j] << endl;
            throw logic_error("BPP::getTuples(): bad column width");
        }
      }
    }
  }

  in >> *cachedIO;
  in >> *physIO;
  in >> *touchedBlocks;
  idbassert(in.length() == 0);
}

bool BatchPrimitiveProcessorJL::countThisMsg(messageqcpp::ByteStream& in) const
{
  const uint8_t* data = in.buf();
  uint32_t offset = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);  // skip the headers

  ISMPacketHeader* hdr = (ISMPacketHeader*)(data);

  if (_hasScan && in.length() > offset)
  {
    // This is a legitimate error message sent by PrimProc
    // so we need to return to allow upper layer to throw an error
    // if needed.
    if (hdr->Status > 0)
    {
      return true;
    }

    if (data[offset] != 0)
      offset += (data[offset + CP_FLAG_AND_LBID + 1] * 2) + CP_FLAG_AND_LBID + 1 +
                1;  // skip the CP data with wide min/max values (16/32 bytes each). we also skip
                    // cpFromDictScan flag.
    else
      offset += CP_FLAG_AND_LBID;  // skip only the "valid CP data" & LBID bytes
  }

  // Throw b/c PP throws and sends here error msg.
  // See BatchPrimitiveProcessor::writeErrorMsg() for details.
  // The inversion of the assert used here previously.
  if (in.length() <= offset)
  {
    if (hdr->Status > 0)
    {
      throw std::runtime_error(" an exception originally thrown by PrimProc: ");
    }
    throw std::runtime_error(
        " an exception because there is not enough \
data in the Primitive message from PrimProc.");
  }

  return (data[offset] != 0);
}

void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in, vector<RGData>* out) const
{
  RGData rgData;
  uint32_t count, i;

  *in >> count;

  for (i = 0; i < count; i++)
  {
    rgData.deserialize(*in, true);
    out->push_back(rgData);
  }
}

void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out, bool* validCPData,
                                                uint64_t* lbid, bool* fromDictScan, int128_t* min,
                                                int128_t* max, uint32_t* cachedIO, uint32_t* physIO,
                                                uint32_t* touchedBlocks, bool* countThis, uint32_t threadID,
                                                bool* hasWideColumn,
                                                const execplan::CalpontSystemCatalog::ColType& colType) const
{
  uint64_t tmp64;
  int128_t tmp128;
  uint8_t tmp8;
  RGData rgData;
  uint32_t rowCount;
  RowGroup& org = primprocRG[threadID];

  out->clear();

  if (in.length() == 0)
  {
    // done, return an empty RG
    rgData = RGData(org, 0);
    org.setData(&rgData);
    org.resetRowGroup(0);
    out->push_back(rgData);
    *cachedIO = 0;
    *physIO = 0;
    *touchedBlocks = 0;
    return;
  }

  /* skip the header */
  in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));

  if (_hasScan)
  {
    in >> tmp8;
    *validCPData = (tmp8 != 0);

    if (*validCPData)
    {
      in >> *lbid;
      in >> tmp8;
      *fromDictScan = tmp8 != 0;
      in >> tmp8;
      *hasWideColumn = (tmp8 > utils::MAXLEGACYWIDTH);
      if (UNLIKELY(*hasWideColumn))
      {
        idbassert(colType.colWidth > utils::MAXLEGACYWIDTH);
        if (LIKELY(colType.isWideDecimalType()))
        {
          in >> tmp128;
          *min = tmp128;
          in >> tmp128;
          *max = tmp128;
        }
        else
        {
          std::ostringstream oss;
          oss << __func__ << " WARNING!!! Not implemented for the data type ";
          oss << colType.colDataType << std::endl;
          std::cout << oss.str();
          idbassert(false);
        }
      }
      else
      {
        in >> tmp64;
        *min = static_cast<int128_t>(tmp64);
        in >> tmp64;
        *max = static_cast<int128_t>(tmp64);
      }
    }
    else
      in >> *lbid;
  }

  in >> tmp8;
  *countThis = (tmp8 != 0);

  /* Would be cleaner to make the PM BPP's send a msg count to unify the msg formats.
   * For later... */
  if (aggregatorPM)
  {
    deserializeAggregateResult(&in, out);
    // for (uint32_t z = 0; z < out->size(); z++) {
    //	org.setData(&(*out)[z]);
    //	cout << "BPPJL: " << org.toString() << endl;
    //}
  }
  else
  {
    rgData.deserialize(in, true);
    out->push_back(rgData);
    org.setData(&rgData);

    rowCount = org.getRowCount();

    bool pmSendsMatchesAnyway =
        (hasSmallOuterJoin && *countThis && PMJoinerCount > 0 && (fe2 || aggregatorPM));

    if (!pmSendsFinalResult() || pmSendsMatchesAnyway)
    {
      std::shared_ptr<vector<uint32_t>[]> joinResults;
      uint32_t i, j;

      if (pmSendsMatchesAnyway)
      {
        uint16_t joinRowCount;
        in >> joinRowCount;
        rowCount = joinRowCount;
      }

      for (j = 0; j < PMJoinerCount; j++)
      {
        /* Reuse the result space if possible */
        joinResults = tJoiners[j]->getPMJoinArrays(threadID);

        if (joinResults.get() == NULL)
        {
          auto v = new vector<uint32_t>[8192];
          joinResults.reset(v);
          tJoiners[j]->setPMJoinResults(joinResults, threadID);
        }

        for (i = 0; i < rowCount; i++)
          deserializeInlineVector<uint32_t>(in, joinResults[i]);

        if (tJoiners[j]->smallOuterJoin())
          tJoiners[j]->markMatches(threadID, rowCount);
      }
    }
  }

  if (*countThis)
  {
    // 		cout << "grabbing io stats\n";
    in >> *cachedIO;
    in >> *physIO;
    in >> *touchedBlocks;
  }
  else
  {
    *cachedIO = 0;
    *physIO = 0;
    *touchedBlocks = 0;
  }

  idbassert(in.length() == 0);
}

RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const
{
  RGData ret;
  rowgroup::RowGroup rg(projectionRG);

  ret = RGData(rg, 0);
  rg.setData(&ret);
  // rg.convertToInlineDataInPlace();
  rg.resetRowGroup(0);
  rg.setStatus(error);
  return ret;
  // return ret->rowData;
}

void BatchPrimitiveProcessorJL::reset()
{
  ridCount = 0;
  ridMap = 0;
  needToSetLBID = true;
}

void BatchPrimitiveProcessorJL::setLBID(uint64_t l, const BRM::EMEntry& scannedExtent)
{
  uint32_t i;

  dbRoot = scannedExtent.dbRoot;
  baseRid = rowgroup::convertToRid(
      scannedExtent.partitionNum, scannedExtent.segmentNum,
      scannedExtent.blockOffset / (scannedExtent.range.size * 1024),  // the extent #
      (l - scannedExtent.range.start) / scannedExtent.range.size);    // the logical block #

  /*
  cout << "got baserid=" << baseRid << " from partnum=" << scannedExtent.partitionNum
              << " segnum=" << scannedExtent.segmentNum << " extentnum=" <<
              scannedExtent.blockOffset/(scannedExtent.range.size * 1024) <<
              " blocknum=" << (l - scannedExtent.range.start)/scannedExtent.range.size << endl;
  */

  for (i = 0; i < filterCount; ++i)
    filterSteps[i]->setLBID(baseRid, dbRoot);

  for (i = 0; i < projectCount; ++i)
    projectSteps[i]->setLBID(baseRid, dbRoot);
}

string BatchPrimitiveProcessorJL::toString() const
{
  ostringstream ret;
  uint32_t i;

  ret << "BatchPrimitiveProcessorJL:" << endl;

  if (!_hasScan)
  {
    if (sendValues)
      ret << "   -- serializing values" << endl;

    if (sendAbsRids)
      ret << "   -- serializing absolute rids" << endl;
    else
      ret << "   -- serializing relative rids" << endl;
  }
  else
    ret << "   -- scan driven" << endl;

  ret << "   " << (int)filterCount << " filter steps:\n";

  for (i = 0; i < filterCount; i++)
    ret << "      " << filterSteps[i]->toString() << endl;

  ret << "   " << (int)projectCount << " projection steps:\n";

  for (i = 0; i < projectCount; i++)
    ret << "      " << projectSteps[i]->toString() << endl;

  return ret.str();
}

/**
 * The creation messages have the following format:
 *
 * ISMPacketHeader (necessary for DEC and ReadThread classes)
 * ---
 * output type (ElementType, StringElementType, etc)
 * version #
 * transaction #
 * session #  (unnecessary except for debugging)
 * step ID    ( same )
 * unique ID  (uniquely identifies the DEC queue on the UM)
 * 8-bit flags
 * if there's a join...
 *      # of elements in the Joiner object
 *       a flag whether or not to match every element (for inner joins)
 * filter step count
 * (filter count)x serialized Commands
 * projection step count
 * (projection count)x serialized Commands
 */

void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
{
  ISMPacketHeader ism;
  uint32_t i;
  uint16_t flags = 0;

  ism.Command = BATCH_PRIMITIVE_CREATE;

  bs.load((uint8_t*)&ism, sizeof(ism));
  bs << (uint8_t)ot;
  bs << (messageqcpp::ByteStream::quadbyte)txnID;
  bs << (messageqcpp::ByteStream::quadbyte)sessionID;
  bs << (messageqcpp::ByteStream::quadbyte)stepID;
  bs << uniqueID;
  bs << versionInfo;

  if (needStrValues)
    flags |= NEED_STR_VALUES;

  if (sendAbsRids)
    flags |= GOT_ABS_RIDS;

  if (sendValues)
    flags |= GOT_VALUES;

  if (LBIDTrace)
    flags |= LBID_TRACE;

  if (needRidsAtDelivery)
    flags |= SEND_RIDS_AT_DELIVERY;

  if (tJoiners.size() > 0)
    flags |= HAS_JOINER;

  if (sendTupleJoinRowGroupData)
    flags |= JOIN_ROWGROUP_DATA;

  if (wideColumnsWidths)
    flags |= HAS_WIDE_COLUMNS;

  bs << flags;

  if (wideColumnsWidths)
    bs << wideColumnsWidths;

  bs << bop;
  bs << (uint8_t)(forHJ ? 1 : 0);

  if (ot == ROW_GROUP)
  {
    bs << projectionRG;
    // 		cout << "BPPJL: projectionRG is:\n" << projectionRG.toString() << endl;

    /* F&E serialization */
    if (fe1)
    {
      bs << (uint8_t)1;
      bs << *fe1;
      bs << fe1Input;
    }
    else
      bs << (uint8_t)0;

    if (fe2)
    {
      bs << (uint8_t)1;
      bs << *fe2;
      bs << fe2Output;
    }
    else
      bs << (uint8_t)0;
  }

  /* if HAS_JOINER, send the init params */
  if (flags & HAS_JOINER)
  {
    bs << (uint32_t)maxPmJoinResultCount;
    if (ot == ROW_GROUP)
    {
      idbassert(tJoiners.size() > 0);
      bs << (messageqcpp::ByteStream::quadbyte)PMJoinerCount;

      bool atLeastOneFE = false;
#ifdef JLF_DEBUG
      cout << "PMJoinerCount = " << PMJoinerCount << endl;
#endif

      bool smallSideRGSent = false;
      for (i = 0; i < PMJoinerCount; i++)
      {
        bs << (uint32_t)tJoiners[i]->size();
        bs << tJoiners[i]->getJoinType();

        // bs << (uint64_t) tJoiners[i]->smallNullValue();

        bs << (uint8_t)tJoiners[i]->isTypelessJoin();

        if (tJoiners[i]->hasFEFilter())
        {
          atLeastOneFE = true;
#ifdef JLF_DEBUG
          cout << "serializing join FE object\n";
#endif
          bs << *tJoiners[i]->getFcnExpFilter();
        }

        if (!tJoiners[i]->isTypelessJoin())
        {
          bs << (uint64_t)tJoiners[i]->smallNullValue();
          bs << (messageqcpp::ByteStream::quadbyte)tJoiners[i]->getLargeKeyColumn();
          // cout << "large key column is " << (uint32_t) tJoiners[i]->getLargeKeyColumn() << endl;
        }
        else
        {
          serializeVector<uint32_t>(bs, tJoiners[i]->getLargeKeyColumns());
          bs << (uint32_t)tJoiners[i]->getKeyLength();
          // MCOL-4173 Notify PP if smallSide and largeSide have different column widths
          // and send smallSide RG to PP.
          bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn();
          bs << (uint8_t)joinHasSkewedKeyColumn;
          if (!smallSideRGSent && joinHasSkewedKeyColumn)
          {
            idbassert(!smallSideRGs.empty());
            bs << smallSideRGs[0];
            serializeVector<uint32_t>(bs, tJoiners[i]->getSmallKeyColumns());
            smallSideRGSent = true;
          }
        }
      }

      if (atLeastOneFE)
        bs << joinFERG;

      if (sendTupleJoinRowGroupData)
      {
#ifdef JLF_DEBUG
        cout << "sending smallside data\n";
#endif
        serializeVector<RowGroup>(bs, smallSideRGs);
        bs << largeSideRG;
        bs << joinedRG;  // TODO: I think we can omit joinedRG if (!(fe2 || aggregatorPM))
                         // 				cout << "joined RG: " << joinedRG.toString() << endl;
      }
    }
  }

  bs << filterCount;

  for (i = 0; i < filterCount; ++i)
  {
    // 		cout << "serializing step " << i << endl;
    filterSteps[i]->createCommand(bs);
  }

  bs << projectCount;

  for (i = 0; i < projectCount; ++i)
    projectSteps[i]->createCommand(bs);

  // aggregate step only when output is row group
  if (ot == ROW_GROUP)
  {
    if (aggregatorPM.get() != NULL)
    {
      bs << (uint8_t)1;
      bs << aggregateRGPM;
      bs << *(aggregatorPM.get());
    }
    else
    {
      bs << (uint8_t)0;
    }
  }

  // decide which rowgroup is received by PrimProc
  if (ot == ROW_GROUP)
  {
    primprocRG.reset(new RowGroup[threadCount]);

    for (uint32_t i = 0; i < threadCount; i++)
      if (aggregatorPM)
        primprocRG[i] = aggregateRGPM;
      else if (fe2)
        primprocRG[i] = fe2Output;
      // This shouldn't be necessary. As of 2-17-14, PrimProc
      // will only send joined results if fe2 || aggregatorPM,
      // so it will never send back data for joinedRG.
      // else if ((flags & HAS_JOINER) && sendTupleJoinRowGroupData)
      //	primprocRG[i] = joinedRG;
      else
        primprocRG[i] = projectionRG;
  }
}

/**
 * The BPP Run messages have the following format:
 *
 * ISMPacketHeader
 *    -- Interleave field is used for DEC interleaving data
 *    -- Size field is used for the relative weight to process the message
 * -----
 * Session ID
 * Step ID
 * Unique ID
 * Sequence #
 * Iteration count
 * Rid count
 * If absolute rids are sent
 *    (rid count)x 64-bit absolute rids
 * else
 *    RID range bitmap
 *    base RID
 *    (rid count)x 16-bit relative rids
 * If values are sent
 *    (rid count)x 64-bit values
 * (filter count)x run msgs for filter Commands
 * (projection count)x run msgs for projection Commands
 */

// The deser counterpart function is BPP::resetBPP
void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC)
{
  ISMPacketHeader ism;
  uint32_t i;

  /* XXXPAT: BPPJL currently reuses the ism Size fields for other things to
  save bandwidth.  They're completely unused otherwise.  We need to throw out all unused
  fields of every defined header. */

  bs.restart();

  memset((void*)&ism, 0, sizeof(ism));
  ism.Command = BATCH_PRIMITIVE_RUN;

  // TODO: this causes the code using this ism to send on only one socket, even if more than one socket is
  // defined for each PM.
  ism.Interleave = pmNum;
  //	ism.Interleave = pmNum - 1;

  /* ... and the Size field is used for the "weight" of processing a BPP
    where 1 is one Command on one logical block. */
  ism.Size = count * (filterCount + projectCount);

  bs.append((uint8_t*)&ism, sizeof(ism));

  /* The next 4 vars are for BPPSeeder; BPP itself skips them */
  bs << sessionID;
  bs << stepID;
  bs << uniqueID;
  bs << _priority;

  // The weight is used by PrimProc thread pool algo
  uint32_t weight = calculateBPPWeight();
  bs << weight;

  bs << dbRoot;
  bs << count;
  uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0;
  bs << sentByEM;

  if (_hasScan)
  {
    idbassert(ridCount == 0);
  }
  else
  {
    idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids));
  }

  bs << ridCount;

  if (sendAbsRids)
    bs.append((uint8_t*)absRids.get(), ridCount << 3);
  else
  {
    bs << ridMap;
    bs << baseRid;
    bs.append((uint8_t*)relRids, ridCount << 1);
  }

  if (sendValues)
    bs.append((uint8_t*)values, ridCount << 3);

  for (i = 0; i < filterCount; i++)
    filterSteps[i]->runCommand(bs);

  for (i = 0; i < projectCount; i++)
    projectSteps[i]->runCommand(bs);
}

void BatchPrimitiveProcessorJL::runErrorBPP(ByteStream& bs)
{
  ISMPacketHeader ism;
  bs.restart();

  memset((void*)&ism, 0, sizeof(ism));
  ism.Command = BATCH_PRIMITIVE_RUN;
  ism.Status = status;
  ism.Size = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);

  bs.append((uint8_t*)&ism, sizeof(ism));

  bs << (messageqcpp::ByteStream::quadbyte)sessionID;
  bs << (messageqcpp::ByteStream::quadbyte)stepID;
  bs << uniqueID;
  bs << count;
  bs << ridCount;
}

void BatchPrimitiveProcessorJL::destroyBPP(ByteStream& bs) const
{
  ISMPacketHeader ism;

  // 	if (!(sessionID & 0x80000000))
  // 		cout << "step ID " << stepID << " added " << rowCounter << " rows, processed "
  // 			<< rowsProcessed << " rows" << endl;

  memset((void*)&ism, 0, sizeof(ism));
  ism.Command = BATCH_PRIMITIVE_DESTROY;

  bs.append((uint8_t*)&ism, sizeof(ism));
  /* XXXPAT: We can get rid of sessionID and stepID;
  it's there for easier debugging only */
  bs << (messageqcpp::ByteStream::quadbyte)sessionID;
  bs << (messageqcpp::ByteStream::quadbyte)stepID;
  bs << uniqueID;
}

void BatchPrimitiveProcessorJL::useJoiners(const vector<std::shared_ptr<joiner::TupleJoiner> >& j)
{
  pos = 0;
  joinerNum = 0;
  tJoiners = j;

  PMJoinerCount = 0;
  tlKeyLens.reset(new uint32_t[tJoiners.size()]);

  for (uint32_t i = 0; i < tJoiners.size(); i++)
  {
    if (tJoiners[i]->inPM())
    {
      PMJoinerCount++;
      smallSideKeys.push_back(tJoiners[i]->getSmallKeyColumns());
      smallSideRGs.push_back(tJoiners[i]->getSmallRG());

      if (tJoiners[i]->isTypelessJoin())
        tlKeyLens[i] = tJoiners[i]->getKeyLength();

      if (tJoiners[i]->hasFEFilter())
        sendTupleJoinRowGroupData = true;

      if (tJoiners[i]->smallOuterJoin())
        hasSmallOuterJoin = true;
    }
  }

  largeSideRG = tJoiners[0]->getLargeRG();

  if (aggregatorPM || fe2)
  {
    sendTupleJoinRowGroupData = true;
#ifdef JLF_DEBUG
    cout << "will send small side row data\n";
#endif
  }
  posByJoinerNum.reset(new uint32_t[PMJoinerCount]);
  memset(posByJoinerNum.get(), 0, PMJoinerCount * sizeof(uint32_t));
}

// helper fcn to interleave small side data by joinernum
bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
{
  uint i;
  // find the next joiner that still has more data to send.  Set joinerNum & pos.
  for (i = 0; i < PMJoinerCount; i++)
  {
    joinerNum = (joinerNum + 1) % PMJoinerCount;
    if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide()->size())
      break;
  }
  if (i == PMJoinerCount)
    return false;
  pos = posByJoinerNum[joinerNum];
  return true;
}

/* This algorithm relies on the joiners being sorted by size atm */
/* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */
bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
{
  uint32_t size = 0, toSend, i, j;
  ISMPacketHeader ism;
  Row r;
  vector<Row::Pointer>* tSmallSide;
  joiner::TypelessData tlData;
  uint32_t smallKeyCol;
  uint32_t largeKeyCol;
  uint64_t smallkey;
  bool isNull;
  bool bSignedUnsigned;

  bool moreMsgs = pickNextJoinerNum();

  if (!moreMsgs)
  {
    /* last message */
    // 		cout << "sending last joiner msg\n";
    ism.Command = BATCH_PRIMITIVE_END_JOINER;
    bs.load((uint8_t*)&ism, sizeof(ism));
    bs << (messageqcpp::ByteStream::quadbyte)sessionID;
    bs << (messageqcpp::ByteStream::quadbyte)stepID;
    bs << uniqueID;
    return false;
  }

  memset((void*)&ism, 0, sizeof(ism));
  tSmallSide = tJoiners[joinerNum]->getSmallSide();
  size = tSmallSide->size();

#if 0
    if (joinerNum == PMJoinerCount - 1 && pos == size)
    {
        /* last message */
// 		cout << "sending last joiner msg\n";
        ism.Command = BATCH_PRIMITIVE_END_JOINER;
        bs.load((uint8_t*) &ism, sizeof(ism));
        bs << (messageqcpp::ByteStream::quadbyte)sessionID;
        bs << (messageqcpp::ByteStream::quadbyte)stepID;
        bs << uniqueID;
        pos = 0;
        return false;
    }
    else if (pos == size)
    {
        joinerNum++;
        tSmallSide = tJoiners[joinerNum]->getSmallSide();
        size = tSmallSide->size();
        pos = 0;
    }
#endif

  ism.Command = BATCH_PRIMITIVE_ADD_JOINER;
  bs.load((uint8_t*)&ism, sizeof(ism));
  bs << (messageqcpp::ByteStream::quadbyte)sessionID;
  bs << (messageqcpp::ByteStream::quadbyte)stepID;
  bs << uniqueID;

  smallSideRGs[joinerNum].initRow(&r);

  unsigned metasize = 12;  // 12 = sizeof(struct JoinerElements)

  if (tJoiners[joinerNum]->isTypelessJoin())
    metasize = 0;

  unsigned rowunits = fJoinerChunkSize / (r.getSize() + metasize);
  toSend = std::min<unsigned int>(size - pos, rowunits);
  bs << toSend;
  bs << pos;
  bs << joinerNum;

  if (tJoiners[joinerNum]->isTypelessJoin())
  {
    utils::FixedAllocator fa(tlKeyLens[joinerNum], true);

    for (i = pos; i < pos + toSend; i++)
    {
      r.setPointer((*tSmallSide)[i]);
      isNull = false;
      bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin();

      for (j = 0; j < smallSideKeys[joinerNum].size(); j++)
      {
        isNull |= r.isNullValue(smallSideKeys[joinerNum][j]);

        if (UNLIKELY(bSignedUnsigned))
        {
          // BUG 5628 If this is a signed/unsigned join column and the sign bit is set on either side,
          // then it should not compare. Send null to PM to prevent compare
          smallKeyCol = smallSideKeys[joinerNum][j];
          largeKeyCol = tJoiners[joinerNum]->getLargeKeyColumns()[j];

          if (r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(largeKeyCol))
          {
            if (r.isUnsigned(smallKeyCol))
              smallkey = r.getUintField(smallKeyCol);
            else
              smallkey = r.getIntField(smallKeyCol);

            if (smallkey & 0x8000000000000000ULL)
            {
              isNull = true;
              break;
            }
          }
        }
      }

      if (!isNull)
      {
        tlData = makeTypelessKey(r, smallSideKeys[joinerNum], tlKeyLens[joinerNum], &fa, largeSideRG,
                                 tJoiners[joinerNum]->getLargeKeyColumns());
        if (tlData.len == 0)
        {
          isNull = true;
        }
      }
      bs << (uint8_t)isNull;
      if (!isNull)
      {
        tlData.serialize(bs);
        bs << i;
      }
    }
  }
  else
  {
#pragma pack(push, 1)
    struct JoinerElements
    {
      int64_t key;
      uint32_t value;
    } * arr;
#pragma pack(pop)
    bs.needAtLeast(toSend * sizeof(JoinerElements));
    arr = (JoinerElements*)bs.getInputPtr();

    smallKeyCol = smallSideKeys[joinerNum][0];
    bSignedUnsigned =
        r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(tJoiners[joinerNum]->getLargeKeyColumns()[0]);
    j = 0;

    for (i = pos, j = 0; i < pos + toSend; ++i, ++j)
    {
      r.setPointer((*tSmallSide)[i]);

      if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE)
      {
        // Small side is a long double. Since CS can't store larger than DOUBLE,
        // we need to convert to whatever type large side is -- double or int64
        long double smallkeyld = r.getLongDoubleField(smallKeyCol);
        switch (largeSideRG.getColType(tJoiners[joinerNum]->getLargeKeyColumns()[0]))
        {
          case CalpontSystemCatalog::DOUBLE:
          case CalpontSystemCatalog::UDOUBLE:
          case CalpontSystemCatalog::FLOAT:
          case CalpontSystemCatalog::UFLOAT:
          {
            if (smallkeyld > MAX_DOUBLE || smallkeyld < MIN_DOUBLE)
            {
              smallkey = joblist::UBIGINTEMPTYROW;
            }
            else
            {
              double d = (double)smallkeyld;
              smallkey = *(int64_t*)&d;
            }
            break;
          }
          default:
          {
            if (r.isUnsigned(smallKeyCol) && smallkeyld > MAX_UBIGINT)
            {
              smallkey = joblist::UBIGINTEMPTYROW;
            }
            else if (smallkeyld > MAX_BIGINT || smallkeyld < MIN_BIGINT)
            {
              smallkey = joblist::UBIGINTEMPTYROW;
            }
            else
            {
              smallkey = (int64_t)smallkeyld;
            }
            break;
          }
        }
      }
      else if (r.isUnsigned(smallKeyCol))
        smallkey = r.getUintField(smallKeyCol);
      else
        smallkey = r.getIntField(smallKeyCol);

      // If this is a compare signed vs unsigned and the sign bit is on for this value, then all compares
      // against the large side should fall. UBIGINTEMPTYROW is not a valid value, so nothing will match.
      if (bSignedUnsigned && (smallkey & 0x8000000000000000ULL))
        smallkey = joblist::UBIGINTEMPTYROW;

      arr[j].key = (int64_t)smallkey;
      arr[j].value = i;
      // 			cout << "sending " << arr[j].key << ", " << arr[j].value << endl;
    }

    bs.advanceInputPtr(toSend * sizeof(JoinerElements));
  }

  if (sendTupleJoinRowGroupData)
  {
    RowGroup& smallSide = smallSideRGs[joinerNum];
    RGData tmpData(smallSide, toSend);
    Row tmpRow;

    smallSide.setData(&tmpData);
    smallSide.initRow(&tmpRow);
    smallSide.getRow(0, &tmpRow);

    for (i = pos; i < pos + toSend; i++, tmpRow.nextRow())
    {
      r.setPointer((*tSmallSide)[i]);
      copyRow(r, &tmpRow);
    }

    smallSide.setRowCount(toSend);
    tmpData.serialize(bs, smallSide.getDataSize());
  }

  pos += toSend;
  posByJoinerNum[joinerNum] = pos;
  return true;
}

void BatchPrimitiveProcessorJL::setProjectionRowGroup(const rowgroup::RowGroup& rg)
{
  ot = ROW_GROUP;
  projectionRG = rg;
}

void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg)
{
  joinedRG = rg;
}

void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg)
{
  sendAbsRids = false;
  sendValues = false;
  inputRG = rg;
}

void BatchPrimitiveProcessorJL::addAggregateStep(const rowgroup::SP_ROWAGG_PM_t& aggpm,
                                                 const rowgroup::RowGroup& argpm)
{
  aggregatorPM = aggpm;
  aggregateRGPM = argpm;

  if (tJoiners.size() > 0)
    sendTupleJoinRowGroupData = true;
}

/* OR hacks */
void BatchPrimitiveProcessorJL::setBOP(uint32_t op)
{
  bop = op;

  if (op == BOP_OR && filterCount > 1)
  {
    for (int i = 1; i < filterCount; ++i)
    {
      ColumnCommandJL* colcmd = dynamic_cast<ColumnCommandJL*>(filterSteps[i].get());

      if (colcmd != NULL)
        colcmd->scan(false);
    }
  }
}

void BatchPrimitiveProcessorJL::setForHJ(bool b)
{
  forHJ = b;
}

void BatchPrimitiveProcessorJL::setFEGroup1(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
                                            const RowGroup& input)
{
  fe1 = fe;
  fe1Input = input;
}

void BatchPrimitiveProcessorJL::setFEGroup2(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
                                            const RowGroup& output)
{
  fe2 = fe;
  fe2Output = output;

  if (tJoiners.size() > 0 && PMJoinerCount > 0)
    sendTupleJoinRowGroupData = true;
}

const string BatchPrimitiveProcessorJL::toMiniString() const
{
  ostringstream oss;
  int i;
  set<string> colset;
  string col;

  for (i = 0; i < filterCount; i++)
  {
    col = filterSteps[i]->getColName();
    // FilterCommandJL has two referenced columns, needs special handling.
    FilterCommandJL* filterCmd = dynamic_cast<FilterCommandJL*>(filterSteps[i].get());

    if (filterCmd == NULL)
    {
      colset.insert(col);
    }
    else
    {
      // is a FilterCommandJL
      size_t sep = col.find(',');
      colset.insert(col.substr(0, sep));

      if (sep != string::npos)
        colset.insert(col.substr(++sep));
    }
  }

  for (i = 0; i < projectCount; i++)
  {
    col = projectSteps[i]->getColName();
    colset.insert(col);
  }

  set<string>::iterator iter = colset.begin();
  oss << '(' << *iter++;

  while (iter != colset.end())
    oss << ',' << *iter++;

  oss << ')';

  return oss.str();
}

void BatchPrimitiveProcessorJL::setJoinFERG(const RowGroup& rg)
{
  joinFERG = rg;
}

void BatchPrimitiveProcessorJL::abortProcessing(ByteStream* bs)
{
  ISMPacketHeader ism;

  memset((void*)&ism, 0, sizeof(ism));
  ism.Command = BATCH_PRIMITIVE_ABORT;

  bs->load((uint8_t*)&ism, sizeof(ism));
  *bs << uniqueID;
}

void BatchPrimitiveProcessorJL::deliverStringTableRowGroup(bool b)
{
  if (aggregatorPM)
    aggregateRGPM.setUseStringTable(b);
  else if (fe2)
    fe2Output.setUseStringTable(b);
  else
    projectionRG.setUseStringTable(b);
}

}  // namespace joblist
